mod config;
mod error;
mod memory_metrics;
#[cfg(test)]
mod tests;
use std::{
cmp,
fmt::{self, Debug, Display, Formatter},
str::FromStr,
};
use datasize::DataSize;
use derive_more::From;
use prometheus::Registry;
use tracing::{debug, error, warn};
use block_proposer::BlockProposerState;
#[cfg(test)]
use crate::testing::network::NetworkedReactor;
use crate::{
components::{
api_server::{self, ApiServer},
block_executor::{self, BlockExecutor},
block_proposer::{self, BlockProposer},
block_validator::{self, BlockValidator},
chainspec_loader::{self, ChainspecLoader},
consensus::{self, EraSupervisor},
contract_runtime::{self, ContractRuntime},
deploy_acceptor::{self, DeployAcceptor},
fetcher::{self, Fetcher},
gossiper::{self, Gossiper},
linear_chain,
metrics::Metrics,
small_network::{self, GossipedAddress, NodeId, SmallNetwork},
storage::{self, Storage},
Component,
},
effect::{
announcements::{
ApiServerAnnouncement, BlockExecutorAnnouncement, ConsensusAnnouncement,
DeployAcceptorAnnouncement, GossiperAnnouncement, LinearChainAnnouncement,
NetworkAnnouncement,
},
requests::{
ApiRequest, BlockExecutorRequest, BlockProposerRequest, BlockValidationRequest,
ChainspecLoaderRequest, ConsensusRequest, ContractRuntimeRequest, FetcherRequest,
LinearChainRequest, MetricsRequest, NetworkInfoRequest, NetworkRequest, StorageRequest,
},
EffectBuilder, EffectExt, Effects,
},
protocol::Message,
reactor::{self, event_queue_metrics::EventQueueMetrics, EventQueueHandle},
types::{Block, CryptoRngCore, Deploy, ProtoBlock, Tag, TimeDiff, Timestamp},
utils::Source,
};
pub use config::Config;
pub use error::Error;
use linear_chain::LinearChain;
use memory_metrics::MemoryMetrics;
#[derive(Debug, From)]
#[must_use]
pub enum Event {
#[from]
Network(small_network::Event<Message>),
#[from]
BlockProposer(block_proposer::Event),
#[from]
Storage(storage::Event<Storage>),
#[from]
ApiServer(api_server::Event),
#[from]
ChainspecLoader(chainspec_loader::Event),
#[from]
Consensus(consensus::Event<NodeId>),
#[from]
DeployAcceptor(deploy_acceptor::Event),
#[from]
DeployFetcher(fetcher::Event<Deploy>),
#[from]
DeployGossiper(gossiper::Event<Deploy>),
#[from]
AddressGossiper(gossiper::Event<GossipedAddress>),
#[from]
ContractRuntime(contract_runtime::Event),
#[from]
BlockExecutor(block_executor::Event),
#[from]
ProtoBlockValidator(block_validator::Event<ProtoBlock, NodeId>),
#[from]
LinearChain(linear_chain::Event<NodeId>),
#[from]
NetworkRequest(NetworkRequest<NodeId, Message>),
#[from]
NetworkInfoRequest(NetworkInfoRequest<NodeId>),
#[from]
DeployFetcherRequest(FetcherRequest<NodeId, Deploy>),
#[from]
BlockProposerRequest(BlockProposerRequest),
#[from]
BlockExecutorRequest(BlockExecutorRequest),
#[from]
ProtoBlockValidatorRequest(BlockValidationRequest<ProtoBlock, NodeId>),
#[from]
MetricsRequest(MetricsRequest),
#[from]
ChainspecLoaderRequest(ChainspecLoaderRequest),
#[from]
NetworkAnnouncement(NetworkAnnouncement<NodeId, Message>),
#[from]
ApiServerAnnouncement(ApiServerAnnouncement),
#[from]
DeployAcceptorAnnouncement(DeployAcceptorAnnouncement<NodeId>),
#[from]
ConsensusAnnouncement(ConsensusAnnouncement),
#[from]
BlockExecutorAnnouncement(BlockExecutorAnnouncement),
#[from]
DeployGossiperAnnouncement(GossiperAnnouncement<Deploy>),
#[from]
AddressGossiperAnnouncement(GossiperAnnouncement<GossipedAddress>),
#[from]
LinearChainAnnouncement(LinearChainAnnouncement),
}
impl From<StorageRequest<Storage>> for Event {
fn from(request: StorageRequest<Storage>) -> Self {
Event::Storage(storage::Event::Request(request))
}
}
impl From<ApiRequest<NodeId>> for Event {
fn from(request: ApiRequest<NodeId>) -> Self {
Event::ApiServer(api_server::Event::ApiRequest(request))
}
}
impl From<NetworkRequest<NodeId, consensus::ConsensusMessage>> for Event {
fn from(request: NetworkRequest<NodeId, consensus::ConsensusMessage>) -> Self {
Event::NetworkRequest(request.map_payload(Message::from))
}
}
impl From<NetworkRequest<NodeId, gossiper::Message<Deploy>>> for Event {
fn from(request: NetworkRequest<NodeId, gossiper::Message<Deploy>>) -> Self {
Event::NetworkRequest(request.map_payload(Message::from))
}
}
impl From<NetworkRequest<NodeId, gossiper::Message<GossipedAddress>>> for Event {
fn from(request: NetworkRequest<NodeId, gossiper::Message<GossipedAddress>>) -> Self {
Event::NetworkRequest(request.map_payload(Message::from))
}
}
impl From<ContractRuntimeRequest> for Event {
fn from(request: ContractRuntimeRequest) -> Event {
Event::ContractRuntime(contract_runtime::Event::Request(request))
}
}
impl From<ConsensusRequest> for Event {
fn from(request: ConsensusRequest) -> Self {
Event::Consensus(consensus::Event::ConsensusRequest(request))
}
}
impl From<LinearChainRequest<NodeId>> for Event {
fn from(request: LinearChainRequest<NodeId>) -> Self {
Event::LinearChain(linear_chain::Event::Request(request))
}
}
impl Display for Event {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Event::Network(event) => write!(f, "network: {}", event),
Event::BlockProposer(event) => write!(f, "block proposer: {}", event),
Event::Storage(event) => write!(f, "storage: {}", event),
Event::ApiServer(event) => write!(f, "api server: {}", event),
Event::ChainspecLoader(event) => write!(f, "chainspec loader: {}", event),
Event::Consensus(event) => write!(f, "consensus: {}", event),
Event::DeployAcceptor(event) => write!(f, "deploy acceptor: {}", event),
Event::DeployFetcher(event) => write!(f, "deploy fetcher: {}", event),
Event::DeployGossiper(event) => write!(f, "deploy gossiper: {}", event),
Event::AddressGossiper(event) => write!(f, "address gossiper: {}", event),
Event::ContractRuntime(event) => write!(f, "contract runtime: {}", event),
Event::BlockExecutor(event) => write!(f, "block executor: {}", event),
Event::LinearChain(event) => write!(f, "linear-chain event {}", event),
Event::ProtoBlockValidator(event) => write!(f, "block validator: {}", event),
Event::NetworkRequest(req) => write!(f, "network request: {}", req),
Event::NetworkInfoRequest(req) => write!(f, "network info request: {}", req),
Event::ChainspecLoaderRequest(req) => write!(f, "chainspec loader request: {}", req),
Event::DeployFetcherRequest(req) => write!(f, "deploy fetcher request: {}", req),
Event::BlockProposerRequest(req) => write!(f, "block proposer request: {}", req),
Event::BlockExecutorRequest(req) => write!(f, "block executor request: {}", req),
Event::ProtoBlockValidatorRequest(req) => write!(f, "block validator request: {}", req),
Event::MetricsRequest(req) => write!(f, "metrics request: {}", req),
Event::NetworkAnnouncement(ann) => write!(f, "network announcement: {}", ann),
Event::ApiServerAnnouncement(ann) => write!(f, "api server announcement: {}", ann),
Event::DeployAcceptorAnnouncement(ann) => {
write!(f, "deploy acceptor announcement: {}", ann)
}
Event::ConsensusAnnouncement(ann) => write!(f, "consensus announcement: {}", ann),
Event::BlockExecutorAnnouncement(ann) => {
write!(f, "block-executor announcement: {}", ann)
}
Event::DeployGossiperAnnouncement(ann) => {
write!(f, "deploy gossiper announcement: {}", ann)
}
Event::AddressGossiperAnnouncement(ann) => {
write!(f, "address gossiper announcement: {}", ann)
}
Event::LinearChainAnnouncement(ann) => write!(f, "linear chain announcement: {}", ann),
}
}
}
pub struct ValidatorInitConfig {
pub(super) config: Config,
pub(super) chainspec_loader: ChainspecLoader,
pub(super) storage: Storage,
pub(super) contract_runtime: ContractRuntime,
pub(super) consensus: EraSupervisor<NodeId>,
pub(super) init_consensus_effects: Effects<consensus::Event<NodeId>>,
pub(super) linear_chain: Vec<Block>,
pub(super) block_proposer_state: BlockProposerState,
}
#[derive(DataSize, Debug)]
pub struct Reactor {
metrics: Metrics,
net: SmallNetwork<Event, Message>,
address_gossiper: Gossiper<GossipedAddress, Event>,
storage: Storage,
contract_runtime: ContractRuntime,
api_server: ApiServer,
chainspec_loader: ChainspecLoader,
consensus: EraSupervisor<NodeId>,
#[data_size(skip)]
deploy_acceptor: DeployAcceptor,
deploy_fetcher: Fetcher<Deploy>,
deploy_gossiper: Gossiper<Deploy, Event>,
block_proposer: BlockProposer,
block_executor: BlockExecutor,
proto_block_validator: BlockValidator<ProtoBlock, NodeId>,
linear_chain: LinearChain<NodeId>,
#[data_size(skip)]
memory_metrics: MemoryMetrics,
#[data_size(skip)]
event_queue_metrics: EventQueueMetrics,
}
#[cfg(test)]
impl Reactor {
pub(crate) fn consensus(&self) -> &EraSupervisor<NodeId> {
&self.consensus
}
}
impl reactor::Reactor for Reactor {
type Event = Event;
type Config = ValidatorInitConfig;
type Error = Error;
fn new(
config: Self::Config,
registry: &Registry,
event_queue: EventQueueHandle<Self::Event>,
_rng: &mut dyn CryptoRngCore,
) -> Result<(Self, Effects<Event>), Error> {
let ValidatorInitConfig {
config,
chainspec_loader,
storage,
contract_runtime,
consensus,
init_consensus_effects,
linear_chain,
block_proposer_state,
} = config;
let memory_metrics = MemoryMetrics::new(registry.clone())?;
let event_queue_metrics = EventQueueMetrics::new(registry.clone(), event_queue)?;
let metrics = Metrics::new(registry.clone());
let effect_builder = EffectBuilder::new(event_queue);
let (net, net_effects) = SmallNetwork::new(event_queue, config.network, true)?;
let address_gossiper =
Gossiper::new_for_complete_items("address_gossiper", config.gossip, registry)?;
let api_server = ApiServer::new(config.http_server, effect_builder);
let deploy_acceptor = DeployAcceptor::new();
let deploy_fetcher = Fetcher::new(config.fetcher);
let deploy_gossiper = Gossiper::new_for_partial_items(
"deploy_gossiper",
config.gossip,
gossiper::get_deploy_from_storage::<Deploy, Event>,
registry,
)?;
let (block_proposer, block_proposer_effects) =
BlockProposer::new(registry.clone(), effect_builder, block_proposer_state)?;
let mut effects = reactor::wrap_effects(Event::BlockProposer, block_proposer_effects);
let genesis_state_root_hash = chainspec_loader
.genesis_state_root_hash()
.expect("should have state root hash");
let block_executor = BlockExecutor::new(genesis_state_root_hash)
.with_parent_map(linear_chain.last().cloned());
let proto_block_validator = BlockValidator::new();
let linear_chain = LinearChain::new();
effects.extend(reactor::wrap_effects(Event::Network, net_effects));
effects.extend(reactor::wrap_effects(
Event::Consensus,
init_consensus_effects,
));
let now = Timestamp::now();
let five_minutes = TimeDiff::from_str("5minutes").unwrap();
let later_timestamp = cmp::max(
now,
chainspec_loader
.chainspec()
.genesis
.highway_config
.genesis_era_start_timestamp,
);
let timer_duration = later_timestamp + five_minutes - now;
effects.extend(reactor::wrap_effects(
Event::Consensus,
effect_builder
.set_timeout(timer_duration.into())
.event(|_| consensus::Event::Shutdown),
));
Ok((
Reactor {
metrics,
net,
address_gossiper,
storage,
contract_runtime,
api_server,
chainspec_loader,
consensus,
deploy_acceptor,
deploy_fetcher,
deploy_gossiper,
block_proposer,
block_executor,
proto_block_validator,
linear_chain,
memory_metrics,
event_queue_metrics,
},
effects,
))
}
fn dispatch_event(
&mut self,
effect_builder: EffectBuilder<Self::Event>,
rng: &mut dyn CryptoRngCore,
event: Event,
) -> Effects<Self::Event> {
match event {
Event::Network(event) => reactor::wrap_effects(
Event::Network,
self.net.handle_event(effect_builder, rng, event),
),
Event::BlockProposer(event) => reactor::wrap_effects(
Event::BlockProposer,
self.block_proposer.handle_event(effect_builder, rng, event),
),
Event::Storage(event) => reactor::wrap_effects(
Event::Storage,
self.storage.handle_event(effect_builder, rng, event),
),
Event::ApiServer(event) => reactor::wrap_effects(
Event::ApiServer,
self.api_server.handle_event(effect_builder, rng, event),
),
Event::ChainspecLoader(event) => reactor::wrap_effects(
Event::ChainspecLoader,
self.chainspec_loader
.handle_event(effect_builder, rng, event),
),
Event::Consensus(event) => reactor::wrap_effects(
Event::Consensus,
self.consensus.handle_event(effect_builder, rng, event),
),
Event::DeployAcceptor(event) => reactor::wrap_effects(
Event::DeployAcceptor,
self.deploy_acceptor
.handle_event(effect_builder, rng, event),
),
Event::DeployFetcher(event) => reactor::wrap_effects(
Event::DeployFetcher,
self.deploy_fetcher.handle_event(effect_builder, rng, event),
),
Event::DeployGossiper(event) => reactor::wrap_effects(
Event::DeployGossiper,
self.deploy_gossiper
.handle_event(effect_builder, rng, event),
),
Event::AddressGossiper(event) => reactor::wrap_effects(
Event::AddressGossiper,
self.address_gossiper
.handle_event(effect_builder, rng, event),
),
Event::ContractRuntime(event) => reactor::wrap_effects(
Event::ContractRuntime,
self.contract_runtime
.handle_event(effect_builder, rng, event),
),
Event::BlockExecutor(event) => reactor::wrap_effects(
Event::BlockExecutor,
self.block_executor.handle_event(effect_builder, rng, event),
),
Event::ProtoBlockValidator(event) => reactor::wrap_effects(
Event::ProtoBlockValidator,
self.proto_block_validator
.handle_event(effect_builder, rng, event),
),
Event::LinearChain(event) => reactor::wrap_effects(
Event::LinearChain,
self.linear_chain.handle_event(effect_builder, rng, event),
),
Event::NetworkRequest(req) => self.dispatch_event(
effect_builder,
rng,
Event::Network(small_network::Event::from(req)),
),
Event::NetworkInfoRequest(req) => self.dispatch_event(
effect_builder,
rng,
Event::Network(small_network::Event::from(req)),
),
Event::DeployFetcherRequest(req) => {
self.dispatch_event(effect_builder, rng, Event::DeployFetcher(req.into()))
}
Event::BlockProposerRequest(req) => {
self.dispatch_event(effect_builder, rng, Event::BlockProposer(req.into()))
}
Event::BlockExecutorRequest(req) => self.dispatch_event(
effect_builder,
rng,
Event::BlockExecutor(block_executor::Event::from(req)),
),
Event::ProtoBlockValidatorRequest(req) => self.dispatch_event(
effect_builder,
rng,
Event::ProtoBlockValidator(block_validator::Event::from(req)),
),
Event::MetricsRequest(req) => reactor::wrap_effects(
Event::MetricsRequest,
self.metrics.handle_event(effect_builder, rng, req),
),
Event::ChainspecLoaderRequest(req) => {
self.dispatch_event(effect_builder, rng, Event::ChainspecLoader(req.into()))
}
Event::NetworkAnnouncement(NetworkAnnouncement::MessageReceived {
sender,
payload,
}) => {
let reactor_event = match payload {
Message::Consensus(msg) => {
Event::Consensus(consensus::Event::MessageReceived { sender, msg })
}
Message::DeployGossiper(message) => {
Event::DeployGossiper(gossiper::Event::MessageReceived { sender, message })
}
Message::AddressGossiper(message) => {
Event::AddressGossiper(gossiper::Event::MessageReceived { sender, message })
}
Message::GetRequest { tag, serialized_id } => match tag {
Tag::Deploy => {
let deploy_hash = match bincode::deserialize(&serialized_id) {
Ok(hash) => hash,
Err(error) => {
error!(
"failed to decode {:?} from {}: {}",
serialized_id, sender, error
);
return Effects::new();
}
};
Event::Storage(storage::Event::GetDeployForPeer {
deploy_hash,
peer: sender,
})
}
Tag::Block => {
let block_hash = match bincode::deserialize(&serialized_id) {
Ok(hash) => hash,
Err(error) => {
error!(
"failed to decode {:?} from {}: {}",
serialized_id, sender, error
);
return Effects::new();
}
};
Event::LinearChain(linear_chain::Event::Request(
LinearChainRequest::BlockRequest(block_hash, sender),
))
}
Tag::BlockByHeight => {
let height = match bincode::deserialize(&serialized_id) {
Ok(block_by_height) => block_by_height,
Err(error) => {
error!(
"failed to decode {:?} from {}: {}",
serialized_id, sender, error
);
return Effects::new();
}
};
Event::LinearChain(linear_chain::Event::Request(
LinearChainRequest::BlockAtHeight(height, sender),
))
}
Tag::GossipedAddress => {
warn!("received get request for gossiped-address from {}", sender);
return Effects::new();
}
},
Message::GetResponse {
tag,
serialized_item,
} => match tag {
Tag::Deploy => {
let deploy = match bincode::deserialize(&serialized_item) {
Ok(deploy) => Box::new(deploy),
Err(error) => {
error!("failed to decode deploy from {}: {}", sender, error);
return Effects::new();
}
};
Event::DeployAcceptor(deploy_acceptor::Event::Accept {
deploy,
source: Source::Peer(sender),
})
}
Tag::Block => todo!("Handle GET block response"),
Tag::BlockByHeight => todo!("Handle GET BlockByHeight response"),
Tag::GossipedAddress => {
warn!("received get request for gossiped-address from {}", sender);
return Effects::new();
}
},
};
self.dispatch_event(effect_builder, rng, reactor_event)
}
Event::NetworkAnnouncement(NetworkAnnouncement::GossipOurAddress(gossiped_address)) => {
let event = gossiper::Event::ItemReceived {
item_id: gossiped_address,
source: Source::<NodeId>::Client,
};
self.dispatch_event(effect_builder, rng, Event::AddressGossiper(event))
}
Event::NetworkAnnouncement(NetworkAnnouncement::NewPeer(peer_id)) => {
debug!(%peer_id, "new peer announcement event ignored (validator reactor does not care)");
Effects::new()
}
Event::ApiServerAnnouncement(ApiServerAnnouncement::DeployReceived { deploy }) => {
let event = deploy_acceptor::Event::Accept {
deploy,
source: Source::<NodeId>::Client,
};
self.dispatch_event(effect_builder, rng, Event::DeployAcceptor(event))
}
Event::DeployAcceptorAnnouncement(DeployAcceptorAnnouncement::AcceptedNewDeploy {
deploy,
source,
}) => {
let event = block_proposer::Event::Buffer {
hash: *deploy.id(),
header: Box::new(deploy.header().clone()),
};
let mut effects =
self.dispatch_event(effect_builder, rng, Event::BlockProposer(event));
let event = gossiper::Event::ItemReceived {
item_id: *deploy.id(),
source,
};
effects.extend(self.dispatch_event(
effect_builder,
rng,
Event::DeployGossiper(event),
));
let event = fetcher::Event::GotRemotely {
item: deploy,
source,
};
effects.extend(self.dispatch_event(
effect_builder,
rng,
Event::DeployFetcher(event),
));
effects
}
Event::DeployAcceptorAnnouncement(DeployAcceptorAnnouncement::InvalidDeploy {
deploy: _,
source: _,
}) => Effects::new(),
Event::ConsensusAnnouncement(consensus_announcement) => {
let mut reactor_event_dispatch = |dbe: block_proposer::Event| {
self.dispatch_event(effect_builder, rng, Event::BlockProposer(dbe))
};
match consensus_announcement {
ConsensusAnnouncement::Proposed(block) => {
reactor_event_dispatch(block_proposer::Event::ProposedProtoBlock(block))
}
ConsensusAnnouncement::Finalized(block) => {
let mut effects = reactor_event_dispatch(
block_proposer::Event::FinalizedProtoBlock(block.proto_block().clone()),
);
let reactor_event =
Event::ApiServer(api_server::Event::BlockFinalized(block));
effects.extend(self.dispatch_event(effect_builder, rng, reactor_event));
effects
}
ConsensusAnnouncement::Orphaned(block) => {
reactor_event_dispatch(block_proposer::Event::OrphanedProtoBlock(block))
}
ConsensusAnnouncement::Handled(_) => {
debug!("Ignoring `Handled` announcement in `validator` reactor.");
Effects::new()
}
}
}
Event::BlockExecutorAnnouncement(BlockExecutorAnnouncement::LinearChainBlock {
block,
execution_results,
}) => {
let mut effects = Effects::new();
let block_hash = *block.hash();
let reactor_event = Event::LinearChain(linear_chain::Event::LinearChainBlock {
block: Box::new(block),
execution_results: execution_results
.iter()
.map(|(hash, (_header, results))| (*hash, results.clone()))
.collect(),
});
effects.extend(self.dispatch_event(effect_builder, rng, reactor_event));
for (deploy_hash, (deploy_header, execution_result)) in execution_results {
let reactor_event = Event::ApiServer(api_server::Event::DeployProcessed {
deploy_hash,
deploy_header: Box::new(deploy_header),
block_hash,
execution_result,
});
effects.extend(self.dispatch_event(effect_builder, rng, reactor_event));
}
effects
}
Event::DeployGossiperAnnouncement(_ann) => {
unreachable!("the deploy gossiper should never make an announcement")
}
Event::AddressGossiperAnnouncement(ann) => {
let GossiperAnnouncement::NewCompleteItem(gossiped_address) = ann;
let reactor_event =
Event::Network(small_network::Event::PeerAddressReceived(gossiped_address));
self.dispatch_event(effect_builder, rng, reactor_event)
}
Event::LinearChainAnnouncement(LinearChainAnnouncement::BlockAdded {
block_hash,
block_header,
}) => {
let reactor_event = Event::ApiServer(api_server::Event::BlockAdded {
block_hash,
block_header,
});
self.dispatch_event(effect_builder, rng, reactor_event)
}
}
}
fn update_metrics(&mut self, event_queue_handle: EventQueueHandle<Self::Event>) {
self.memory_metrics.estimate(&self);
self.event_queue_metrics
.record_event_queue_counts(&event_queue_handle)
}
}
#[cfg(test)]
impl NetworkedReactor for Reactor {
type NodeId = NodeId;
fn node_id(&self) -> Self::NodeId {
self.net.node_id()
}
}