mod config;
mod event;
mod filters;
mod http_server;
use std::{convert::Infallible, fmt::Debug};
use datasize::DataSize;
use futures::{future::BoxFuture, join, FutureExt};
use tokio::{sync::oneshot, task::JoinHandle};
use tracing::{debug, error, warn};
use super::Component;
use crate::{
effect::{
requests::{ChainspecLoaderRequest, MetricsRequest, NetworkInfoRequest, StorageRequest},
EffectBuilder, EffectExt, Effects,
},
reactor::Finalize,
types::{NodeId, StatusFeed},
utils::{self, ListeningError},
NodeRng,
};
use crate::effect::requests::RestRequest;
pub use config::Config;
pub(crate) use event::Event;
pub trait ReactorEventT:
From<Event>
+ From<RestRequest<NodeId>>
+ From<NetworkInfoRequest<NodeId>>
+ From<StorageRequest>
+ From<ChainspecLoaderRequest>
+ From<MetricsRequest>
+ Send
{
}
impl<REv> ReactorEventT for REv where
REv: From<Event>
+ From<RestRequest<NodeId>>
+ From<NetworkInfoRequest<NodeId>>
+ From<StorageRequest>
+ From<ChainspecLoaderRequest>
+ From<MetricsRequest>
+ Send
+ 'static
{
}
#[derive(DataSize, Debug)]
pub(crate) struct RestServer {
shutdown_sender: oneshot::Sender<()>,
server_join_handle: Option<JoinHandle<()>>,
}
impl RestServer {
pub(crate) fn new<REv>(
config: Config,
effect_builder: EffectBuilder<REv>,
) -> Result<Self, ListeningError>
where
REv: ReactorEventT,
{
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let builder = utils::start_listening(&config.address)?;
let server_join_handle =
tokio::spawn(http_server::run(builder, effect_builder, shutdown_receiver));
Ok(RestServer {
shutdown_sender,
server_join_handle: Some(server_join_handle),
})
}
}
impl<REv> Component<REv> for RestServer
where
REv: ReactorEventT,
{
type Event = Event;
type ConstructionError = Infallible;
fn handle_event(
&mut self,
effect_builder: EffectBuilder<REv>,
_rng: &mut NodeRng,
event: Self::Event,
) -> Effects<Self::Event> {
match event {
Event::RestRequest(RestRequest::GetStatus { responder }) => async move {
let (last_added_block, peers, chainspec_info) = join!(
effect_builder.get_highest_block(),
effect_builder.network_peers(),
effect_builder.get_chainspec_info()
);
let status_feed = StatusFeed::new(last_added_block, peers, chainspec_info);
responder.respond(status_feed).await;
}
.ignore(),
Event::RestRequest(RestRequest::GetMetrics { responder }) => effect_builder
.get_metrics()
.event(move |text| Event::GetMetricsResult {
text,
main_responder: responder,
}),
Event::GetMetricsResult {
text,
main_responder,
} => main_responder.respond(text).ignore(),
}
}
}
impl Finalize for RestServer {
fn finalize(mut self) -> BoxFuture<'static, ()> {
async {
let _ = self.shutdown_sender.send(());
if let Some(join_handle) = self.server_join_handle.take() {
match join_handle.await {
Ok(_) => debug!("rest server exited cleanly"),
Err(error) => error!(%error, "could not join rest server task cleanly"),
}
} else {
warn!("rest server shutdown while already shut down")
}
}
.boxed()
}
}