use std::convert::Infallible;
use futures::{
future::{self, select},
FutureExt,
};
use hyper::Server;
use tokio::{
select,
sync::{mpsc, oneshot},
};
use tracing::{debug, info, trace, warn};
use warp::Filter;
use wheelbuf::WheelBuf;
use super::{
rest_server,
rpcs::{self, RpcWithOptionalParamsExt, RpcWithParamsExt, RpcWithoutParamsExt},
sse_server::{self, BroadcastChannelMessage, ServerSentEvent, SSE_INITIAL_EVENT},
Config, ReactorEventT, SseData,
};
use crate::{effect::EffectBuilder, utils};
pub(super) async fn run<REv: ReactorEventT>(
config: Config,
effect_builder: EffectBuilder<REv>,
mut data_receiver: mpsc::UnboundedReceiver<SseData>,
) {
let rest_status = rest_server::create_status_filter(effect_builder);
let rest_metrics = rest_server::create_metrics_filter(effect_builder);
let rpc_put_deploy = rpcs::account::PutDeploy::create_filter(effect_builder);
let rpc_get_block = rpcs::chain::GetBlock::create_filter(effect_builder);
let rpc_get_state_root_hash = rpcs::chain::GetStateRootHash::create_filter(effect_builder);
let rpc_get_item = rpcs::state::GetItem::create_filter(effect_builder);
let rpc_get_balance = rpcs::state::GetBalance::create_filter(effect_builder);
let rpc_get_deploy = rpcs::info::GetDeploy::create_filter(effect_builder);
let rpc_get_peers = rpcs::info::GetPeers::create_filter(effect_builder);
let rpc_get_status = rpcs::info::GetStatus::create_filter(effect_builder);
let rpc_get_auction_info = rpcs::state::GetAuctionInfo::create_filter(effect_builder);
let (broadcaster, mut new_subscriber_info_receiver, sse_filter) =
sse_server::create_channels_and_filter();
let service = warp_json_rpc::service(
rest_status
.or(rest_metrics)
.or(rpc_put_deploy)
.or(rpc_get_block)
.or(rpc_get_state_root_hash)
.or(rpc_get_item)
.or(rpc_get_balance)
.or(rpc_get_deploy)
.or(rpc_get_peers)
.or(rpc_get_status)
.or(rpc_get_auction_info)
.or(sse_filter),
);
let mut server_address = match utils::resolve_address(&config.address) {
Ok(address) => address,
Err(error) => {
warn!(%error, "failed to start HTTP server, cannot parse address");
return;
}
};
let builder = loop {
match Server::try_bind(&server_address) {
Ok(builder) => {
break builder;
}
Err(error) => {
if server_address.port() == 0 {
warn!(%error, "failed to start HTTP server");
return;
} else {
server_address.set_port(0);
debug!(%error, "failed to start HTTP server. retrying on random port");
}
}
}
};
let make_svc =
hyper::service::make_service_fn(move |_| future::ok::<_, Infallible>(service.clone()));
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let server = builder.serve(make_svc);
info!(address = %server.local_addr(), "started HTTP server");
let server_with_shutdown = server.with_graceful_shutdown(async {
shutdown_receiver.await.ok();
});
let server_joiner = tokio::spawn(server_with_shutdown);
let mut event_index = 0_u32;
let mut buffer = WheelBuf::new(vec![
SSE_INITIAL_EVENT.clone();
config.event_stream_buffer_length as usize
]);
let event_stream_fut = async {
loop {
select! {
maybe_new_subscriber = new_subscriber_info_receiver.recv() => {
if let Some(subscriber) = maybe_new_subscriber {
let _ = subscriber.initial_events_sender.send(SSE_INITIAL_EVENT.clone());
if let Some(start_index) = subscriber.start_from {
for event in buffer
.iter()
.skip_while(|event| event.id.unwrap() < start_index)
{
let _ = subscriber.initial_events_sender.send(event.clone());
}
}
}
}
maybe_data = data_receiver.recv() => {
match maybe_data {
Some(data) => {
trace!("HTTP server received {:?}", data);
let event = ServerSentEvent { id: Some(event_index), data };
buffer.push(event.clone());
let message = BroadcastChannelMessage::ServerSentEvent(event);
let _ = broadcaster.send(message);
event_index = event_index.wrapping_add(1);
}
None => {
info!("shutting down HTTP server");
break;
}
}
}
}
}
};
let _ = select(server_joiner, event_stream_fut.boxed()).await;
let _ = broadcaster.send(BroadcastChannelMessage::Shutdown);
let _ = shutdown_sender.send(());
trace!("HTTP server stopped");
}