use std::fmt::{self, Display, Formatter};
use datasize::DataSize;
use derive_more::From;
use prometheus::Registry;
use serde::Serialize;
use thiserror::Error;
use crate::{
components::{
chainspec_loader::{self, ChainspecLoader},
contract_runtime::{self, ContractRuntime},
storage::{self, Storage},
Component,
},
effect::{
requests::{ContractRuntimeRequest, NetworkRequest, StorageRequest},
EffectBuilder, Effects,
},
protocol::Message,
reactor::{self, validator, EventQueueHandle},
types::NodeId,
utils::WithDir,
NodeRng,
};
#[derive(Debug, From, Serialize)]
#[must_use]
pub enum Event {
#[from]
Chainspec(chainspec_loader::Event),
#[from]
Storage(#[serde(skip_serializing)] storage::Event),
#[from]
ContractRuntime(contract_runtime::Event),
}
impl From<StorageRequest> for Event {
fn from(request: StorageRequest) -> Self {
Event::Storage(storage::Event::StorageRequest(request))
}
}
impl From<ContractRuntimeRequest> for Event {
fn from(request: ContractRuntimeRequest) -> Self {
Event::ContractRuntime(contract_runtime::Event::Request(request))
}
}
impl From<NetworkRequest<NodeId, Message>> for Event {
fn from(_request: NetworkRequest<NodeId, Message>) -> Self {
unreachable!("no network traffic happens during initialization")
}
}
impl Display for Event {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
match self {
Event::Chainspec(event) => write!(formatter, "chainspec: {}", event),
Event::Storage(event) => write!(formatter, "storage: {}", event),
Event::ContractRuntime(event) => write!(formatter, "contract runtime: {}", event),
}
}
}
#[derive(Debug, Error)]
pub enum Error {
#[error("config error: {0}")]
ConfigError(String),
#[error("prometheus (metrics) error: {0}")]
Metrics(#[from] prometheus::Error),
#[error("chainspec error: {0}")]
Chainspec(#[from] chainspec_loader::Error),
#[error("storage error: {0}")]
Storage(#[from] storage::Error),
#[error("contract runtime config error: {0}")]
ContractRuntime(#[from] contract_runtime::ConfigError),
}
#[derive(DataSize, Debug)]
pub struct Reactor {
pub(super) config: WithDir<validator::Config>,
pub(super) chainspec_loader: ChainspecLoader,
pub(super) storage: Storage,
pub(super) contract_runtime: ContractRuntime,
}
impl Reactor {
pub fn stopped_successfully(&self) -> bool {
self.chainspec_loader.stopped_successfully()
}
}
impl reactor::Reactor for Reactor {
type Event = Event;
type Config = WithDir<validator::Config>;
type Error = Error;
fn new(
config: Self::Config,
registry: &Registry,
event_queue: EventQueueHandle<Self::Event>,
_rng: &mut NodeRng,
) -> Result<(Self, Effects<Self::Event>), Error> {
let chainspec = config
.value()
.node
.chainspec_config_path
.clone()
.load(config.dir())
.map_err(|err| Error::ConfigError(err.to_string()))?;
chainspec.validate_config();
let effect_builder = EffectBuilder::new(event_queue);
let storage_config = config.map_ref(|cfg| cfg.storage.clone());
let storage = Storage::new(&storage_config)?;
let contract_runtime =
ContractRuntime::new(storage_config, &config.value().contract_runtime, registry)?;
let (chainspec_loader, chainspec_effects) = ChainspecLoader::new(chainspec, effect_builder);
let effects = reactor::wrap_effects(Event::Chainspec, chainspec_effects);
Ok((
Reactor {
config,
chainspec_loader,
storage,
contract_runtime,
},
effects,
))
}
fn dispatch_event(
&mut self,
effect_builder: EffectBuilder<Self::Event>,
rng: &mut NodeRng,
event: Event,
) -> Effects<Self::Event> {
match event {
Event::Chainspec(event) => reactor::wrap_effects(
Event::Chainspec,
self.chainspec_loader
.handle_event(effect_builder, rng, event),
),
Event::Storage(event) => reactor::wrap_effects(
Event::Storage,
self.storage.handle_event(effect_builder, rng, event),
),
Event::ContractRuntime(event) => reactor::wrap_effects(
Event::ContractRuntime,
self.contract_runtime
.handle_event(effect_builder, rng, event),
),
}
}
fn is_stopped(&mut self) -> bool {
self.chainspec_loader.is_stopped()
}
}