mod config;
mod event;
mod filters;
mod http_server;
use std::{convert::Infallible, fmt::Debug, time::Instant};
use datasize::DataSize;
use futures::{future::BoxFuture, join, FutureExt};
use tokio::{sync::oneshot, task::JoinHandle};
use tracing::{debug, error, warn};
use casper_types::ProtocolVersion;
use super::Component;
use crate::{
effect::{
requests::{
ChainspecLoaderRequest, ConsensusRequest, MetricsRequest, NetworkInfoRequest,
StorageRequest,
},
EffectBuilder, EffectExt, Effects,
},
reactor::Finalize,
types::{NodeId, StatusFeed},
utils::{self, ListeningError},
NodeRng,
};
use crate::{components::rpc_server::rpcs::docs::OPEN_RPC_SCHEMA, effect::requests::RestRequest};
pub use config::Config;
pub(crate) use event::Event;
pub(crate) trait ReactorEventT:
From<Event>
+ From<RestRequest<NodeId>>
+ From<NetworkInfoRequest<NodeId>>
+ From<StorageRequest>
+ From<ChainspecLoaderRequest>
+ From<ConsensusRequest>
+ From<MetricsRequest>
+ Send
{
}
impl<REv> ReactorEventT for REv where
REv: From<Event>
+ From<RestRequest<NodeId>>
+ From<NetworkInfoRequest<NodeId>>
+ From<StorageRequest>
+ From<ChainspecLoaderRequest>
+ From<ConsensusRequest>
+ From<MetricsRequest>
+ Send
+ 'static
{
}
#[derive(DataSize, Debug)]
pub(crate) struct RestServer {
#[data_size(skip)]
shutdown_sender: oneshot::Sender<()>,
#[data_size(skip)]
server_join_handle: Option<JoinHandle<()>>,
node_startup_instant: Instant,
}
impl RestServer {
pub(crate) fn new<REv>(
config: Config,
effect_builder: EffectBuilder<REv>,
api_version: ProtocolVersion,
node_startup_instant: Instant,
) -> Result<Self, ListeningError>
where
REv: ReactorEventT,
{
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let builder = utils::start_listening(&config.address)?;
let server_join_handle = tokio::spawn(http_server::run(
builder,
effect_builder,
api_version,
shutdown_receiver,
config.qps_limit,
));
Ok(RestServer {
shutdown_sender,
server_join_handle: Some(server_join_handle),
node_startup_instant,
})
}
}
impl<REv> Component<REv> for RestServer
where
REv: ReactorEventT,
{
type Event = Event;
type ConstructionError = Infallible;
fn handle_event(
&mut self,
effect_builder: EffectBuilder<REv>,
_rng: &mut NodeRng,
event: Self::Event,
) -> Effects<Self::Event> {
match event {
Event::RestRequest(RestRequest::Status { responder }) => {
let node_uptime = self.node_startup_instant.elapsed();
async move {
let (last_added_block, peers, chainspec_info, consensus_status) = join!(
effect_builder.get_highest_block_from_storage(),
effect_builder.network_peers(),
effect_builder.get_chainspec_info(),
effect_builder.consensus_status()
);
let status_feed = StatusFeed::new(
last_added_block,
peers,
chainspec_info,
consensus_status,
node_uptime,
);
responder.respond(status_feed).await;
}
}
.ignore(),
Event::RestRequest(RestRequest::Metrics { responder }) => effect_builder
.get_metrics()
.event(move |text| Event::GetMetricsResult {
text,
main_responder: responder,
}),
Event::RestRequest(RestRequest::RpcSchema { responder }) => {
let schema = OPEN_RPC_SCHEMA.clone();
responder.respond(schema).ignore()
}
Event::GetMetricsResult {
text,
main_responder,
} => main_responder.respond(text).ignore(),
}
}
}
impl Finalize for RestServer {
fn finalize(mut self) -> BoxFuture<'static, ()> {
async {
let _ = self.shutdown_sender.send(());
if let Some(join_handle) = self.server_join_handle.take() {
match join_handle.await {
Ok(_) => debug!("rest server exited cleanly"),
Err(error) => error!(%error, "could not join rest server task cleanly"),
}
} else {
warn!("rest server shutdown while already shut down")
}
}
.boxed()
}
}
#[cfg(test)]
mod tests {
use std::fs;
use assert_json_diff::assert_json_eq;
use schemars::{schema_for, JsonSchema};
use serde_json::Value;
use crate::{
rpcs::{docs::OpenRpcSchema, info::GetValidatorChangesResult},
types::GetStatusResult,
};
fn assert_schema<T: JsonSchema>(schema_path: String) {
let expected_schema = fs::read_to_string(schema_path).unwrap();
let expected_schema: Value = serde_json::from_str(&expected_schema).unwrap();
let actual_schema = schema_for!(T);
let actual_schema = serde_json::to_string_pretty(&actual_schema).unwrap();
let actual_schema: Value = serde_json::from_str(&actual_schema).unwrap();
assert_json_eq!(actual_schema, expected_schema);
}
#[test]
fn schema_status() {
let schema_path = format!(
"{}/../resources/test/rest_schema_status.json",
env!("CARGO_MANIFEST_DIR")
);
assert_schema::<GetStatusResult>(schema_path);
}
#[test]
fn schema_validator_changes() {
let schema_path = format!(
"{}/../resources/test/rest_schema_validator_changes.json",
env!("CARGO_MANIFEST_DIR")
);
assert_schema::<GetValidatorChangesResult>(schema_path);
}
#[test]
fn schema_rpc_schema() {
let schema_path = format!(
"{}/../resources/test/rest_schema_rpc_schema.json",
env!("CARGO_MANIFEST_DIR")
);
assert_schema::<OpenRpcSchema>(schema_path);
}
}