mod config;
mod docs;
mod event;
mod filters;
mod http_server;
mod info;
use std::{net::SocketAddr, sync::Arc};
use datasize::DataSize;
use futures::join;
use once_cell::sync::OnceCell;
use tokio::{sync::oneshot, task::JoinHandle};
use tracing::{error, info, warn};
#[cfg(test)]
use futures::{future::BoxFuture, FutureExt};
#[cfg(test)]
use tracing::debug;
use casper_types::ProtocolVersion;
use super::{Component, ComponentState, InitializedComponent};
use crate::{
components::PortBoundComponent,
effect::{
requests::{
BlockSynchronizerRequest, ChainspecRawBytesRequest, ConsensusRequest, MetricsRequest,
NetworkInfoRequest, ReactorInfoRequest, RestRequest, StorageRequest,
UpgradeWatcherRequest,
},
EffectBuilder, EffectExt, Effects,
},
reactor::main_reactor::MainEvent,
types::{ChainspecInfo, StatusFeed},
utils::{self, ListeningError},
NodeRng,
};
pub use config::Config;
pub use docs::DocExample;
pub(crate) use docs::DOCS_EXAMPLE_PROTOCOL_VERSION;
pub(crate) use event::Event;
pub(crate) use info::{GetChainspecResult, GetValidatorChangesResult};
const COMPONENT_NAME: &str = "rest_server";
pub(crate) trait ReactorEventT:
From<Event>
+ From<RestRequest>
+ From<NetworkInfoRequest>
+ From<StorageRequest>
+ From<ChainspecRawBytesRequest>
+ From<UpgradeWatcherRequest>
+ From<ConsensusRequest>
+ From<MetricsRequest>
+ From<ReactorInfoRequest>
+ From<BlockSynchronizerRequest>
+ Send
{
}
impl<REv> ReactorEventT for REv where
REv: From<Event>
+ From<RestRequest>
+ From<NetworkInfoRequest>
+ From<StorageRequest>
+ From<ChainspecRawBytesRequest>
+ From<UpgradeWatcherRequest>
+ From<ConsensusRequest>
+ From<MetricsRequest>
+ From<ReactorInfoRequest>
+ From<BlockSynchronizerRequest>
+ Send
+ 'static
{
}
#[derive(DataSize, Debug)]
pub(crate) struct InnerRestServer {
#[data_size(skip)]
#[allow(dead_code)]
shutdown_sender: oneshot::Sender<()>,
local_addr: Arc<OnceCell<SocketAddr>>,
#[data_size(skip)]
#[allow(dead_code)]
server_join_handle: Option<JoinHandle<()>>,
network_name: String,
}
#[derive(DataSize, Debug)]
pub(crate) struct RestServer {
state: ComponentState,
config: Config,
api_version: ProtocolVersion,
network_name: String,
inner_rest: Option<InnerRestServer>,
}
impl RestServer {
pub(crate) fn new(config: Config, api_version: ProtocolVersion, network_name: String) -> Self {
RestServer {
state: ComponentState::Uninitialized,
config,
api_version,
network_name,
inner_rest: None,
}
}
}
impl<REv> Component<REv> for RestServer
where
REv: ReactorEventT,
{
type Event = Event;
fn handle_event(
&mut self,
effect_builder: EffectBuilder<REv>,
_rng: &mut NodeRng,
event: Self::Event,
) -> Effects<Self::Event> {
match &self.state {
ComponentState::Fatal(msg) => {
error!(
msg,
?event,
name = <Self as Component<MainEvent>>::name(self),
"should not handle this event when this component has fatal error"
);
Effects::new()
}
ComponentState::Uninitialized => {
warn!(
?event,
name = <Self as Component<MainEvent>>::name(self),
"should not handle this event when component is uninitialized"
);
Effects::new()
}
ComponentState::Initializing => match event {
Event::Initialize => {
let (effects, state) = self.bind(self.config.enable_server, effect_builder);
<Self as InitializedComponent<MainEvent>>::set_state(self, state);
effects
}
Event::RestRequest(_) | Event::GetMetricsResult { .. } => {
warn!(
?event,
name = <Self as Component<MainEvent>>::name(self),
"should not handle this event when component is pending initialization"
);
Effects::new()
}
},
ComponentState::Initialized => match event {
Event::Initialize => {
error!(
?event,
name = <Self as Component<MainEvent>>::name(self),
"component already initialized"
);
Effects::new()
}
Event::RestRequest(RestRequest::Status { responder }) => {
let network_name = self.network_name.clone();
async move {
let (
last_added_block,
peers,
next_upgrade,
consensus_status,
reactor_state,
last_progress,
node_uptime,
available_block_range,
block_sync,
latest_switch_block_header,
) = join!(
effect_builder.get_highest_complete_block_from_storage(),
effect_builder.network_peers(),
effect_builder.get_next_upgrade(),
effect_builder.consensus_status(),
effect_builder.get_reactor_state(),
effect_builder.get_last_progress(),
effect_builder.get_uptime(),
effect_builder.get_available_block_range_from_storage(),
effect_builder.get_block_synchronizer_status(),
effect_builder.get_latest_switch_block_header_from_storage()
);
let starting_state_root_hash = effect_builder
.get_block_header_at_height_from_storage(
available_block_range.low(),
true,
)
.await
.map(|header| *header.state_root_hash())
.unwrap_or_default();
let status_feed = StatusFeed::new(
last_added_block,
peers,
ChainspecInfo::new(network_name, next_upgrade),
consensus_status,
node_uptime.into(),
reactor_state,
last_progress.into_inner(),
available_block_range,
block_sync,
starting_state_root_hash,
latest_switch_block_header.map(|header| header.block_hash()),
);
responder.respond(status_feed).await;
}
}
.ignore(),
Event::RestRequest(RestRequest::Metrics { responder }) => effect_builder
.get_metrics()
.event(move |text| Event::GetMetricsResult {
text,
main_responder: responder,
}),
Event::GetMetricsResult {
text,
main_responder,
} => main_responder.respond(text).ignore(),
},
}
}
fn name(&self) -> &str {
COMPONENT_NAME
}
}
impl<REv> InitializedComponent<REv> for RestServer
where
REv: ReactorEventT,
{
fn state(&self) -> &ComponentState {
&self.state
}
fn set_state(&mut self, new_state: ComponentState) {
info!(
?new_state,
name = <Self as Component<MainEvent>>::name(self),
"component state changed"
);
self.state = new_state;
}
}
impl<REv> PortBoundComponent<REv> for RestServer
where
REv: ReactorEventT,
{
type Error = ListeningError;
type ComponentEvent = Event;
fn listen(
&mut self,
effect_builder: EffectBuilder<REv>,
) -> Result<Effects<Self::ComponentEvent>, Self::Error> {
let cfg = &self.config;
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let builder = utils::start_listening(&cfg.address)?;
let local_addr: Arc<OnceCell<SocketAddr>> = Default::default();
let server_join_handle = if cfg.cors_origin.is_empty() {
Some(tokio::spawn(http_server::run(
builder,
effect_builder,
self.api_version,
shutdown_receiver,
cfg.qps_limit,
local_addr.clone(),
)))
} else {
Some(tokio::spawn(http_server::run_with_cors(
builder,
effect_builder,
self.api_version,
shutdown_receiver,
cfg.qps_limit,
local_addr.clone(),
cfg.cors_origin.clone(),
)))
};
let network_name = self.network_name.clone();
self.inner_rest = Some(InnerRestServer {
local_addr,
shutdown_sender,
server_join_handle,
network_name,
});
Ok(Effects::new())
}
}
#[cfg(test)]
impl crate::reactor::Finalize for RestServer {
fn finalize(self) -> BoxFuture<'static, ()> {
async {
if let Some(mut rest_server) = self.inner_rest {
let _ = rest_server.shutdown_sender.send(());
if let Some(join_handle) = rest_server.server_join_handle.take() {
match join_handle.await {
Ok(_) => debug!("rest server exited cleanly"),
Err(error) => error!(%error, "could not join rest server task cleanly"),
}
} else {
warn!("rest server shutdown while already shut down")
}
} else {
info!("rest server was disabled in config, no shutdown performed")
}
}
.boxed()
}
}
#[cfg(test)]
mod schema_tests {
use crate::{testing::assert_schema, types::GetStatusResult};
use schemars::schema_for;
use super::{GetChainspecResult, GetValidatorChangesResult};
#[test]
fn json_schema_status_check() {
let schema_path = format!(
"{}/../resources/test/rest_schema_status.json",
env!("CARGO_MANIFEST_DIR")
);
let pretty = serde_json::to_string_pretty(&schema_for!(GetStatusResult)).unwrap();
assert_schema(schema_path, pretty);
}
#[test]
fn json_schema_validator_changes_check() {
let schema_path = format!(
"{}/../resources/test/rest_schema_validator_changes.json",
env!("CARGO_MANIFEST_DIR")
);
assert_schema(
schema_path,
serde_json::to_string_pretty(&schema_for!(GetValidatorChangesResult)).unwrap(),
);
}
#[test]
fn json_schema_chainspec_bytes_check() {
let schema_path = format!(
"{}/../resources/test/rest_schema_chainspec_bytes.json",
env!("CARGO_MANIFEST_DIR")
);
assert_schema(
schema_path,
serde_json::to_string_pretty(&schema_for!(GetChainspecResult)).unwrap(),
);
}
}