mod config;
mod deploy_getter;
mod event;
mod event_indexer;
mod http_server;
mod sse_server;
#[cfg(test)]
mod tests;
use std::{convert::Infallible, fmt::Debug, net::SocketAddr, path::PathBuf};
use datasize::DataSize;
use tokio::sync::{
mpsc::{self, UnboundedSender},
oneshot,
};
use tracing::{info, warn};
use casper_types::ProtocolVersion;
use super::Component;
use crate::{
effect::{EffectBuilder, Effects},
reactor::participating::ParticipatingEvent as ParticipatingReactorEvent,
types::JsonBlock,
utils::{self, ListeningError},
NodeRng,
};
pub use config::Config;
pub(crate) use deploy_getter::DeployGetter;
pub(crate) use event::Event;
use event_indexer::{EventIndex, EventIndexer};
use sse_server::ChannelsAndFilter;
pub(crate) use sse_server::SseData;
const ADDITIONAL_PERCENT_FOR_BROADCAST_CHANNEL_SIZE: u32 = 20;
pub trait ReactorEventT: From<Event> + Send {}
impl<REv> ReactorEventT for REv where REv: From<Event> + Send + 'static {}
#[derive(DataSize, Debug)]
pub(crate) struct EventStreamServer {
#[data_size(skip)]
sse_data_sender: UnboundedSender<(EventIndex, SseData)>,
event_indexer: EventIndexer,
listening_address: SocketAddr,
deploy_getter: DeployGetter,
}
impl EventStreamServer {
pub(crate) fn new(
config: Config,
storage_path: PathBuf,
api_version: ProtocolVersion,
deploy_getter: DeployGetter,
) -> Result<Self, ListeningError> {
let required_address = utils::resolve_address(&config.address).map_err(|error| {
warn!(
%error,
address=%config.address,
"failed to start event stream server, cannot parse address"
);
ListeningError::ResolveAddress(error)
})?;
let event_indexer = EventIndexer::new(storage_path);
let (sse_data_sender, sse_data_receiver) = mpsc::unbounded_channel();
let broadcast_channel_size = config.event_stream_buffer_length
* (100 + ADDITIONAL_PERCENT_FOR_BROADCAST_CHANNEL_SIZE)
/ 100;
let ChannelsAndFilter {
event_broadcaster,
new_subscriber_info_receiver,
sse_filter,
} = ChannelsAndFilter::new(
broadcast_channel_size as usize,
config.max_concurrent_subscribers,
deploy_getter.clone(),
);
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let (listening_address, server_with_shutdown) = warp::serve(sse_filter)
.try_bind_with_graceful_shutdown(required_address, async {
shutdown_receiver.await.ok();
})
.map_err(|error| ListeningError::Listen {
address: required_address,
error: Box::new(error),
})?;
info!(address=%listening_address, "started event stream server");
tokio::spawn(http_server::run(
config,
api_version,
server_with_shutdown,
shutdown_sender,
sse_data_receiver,
event_broadcaster,
new_subscriber_info_receiver,
));
Ok(EventStreamServer {
sse_data_sender,
event_indexer,
listening_address,
deploy_getter,
})
}
pub(crate) fn set_participating_effect_builder(
&self,
effect_builder: EffectBuilder<ParticipatingReactorEvent>,
) {
self.deploy_getter
.set_participating_effect_builder(effect_builder);
}
fn broadcast(&mut self, sse_data: SseData) -> Effects<Event> {
let event_index = self.event_indexer.next_index();
let _ = self.sse_data_sender.send((event_index, sse_data));
Effects::new()
}
}
impl<REv> Component<REv> for EventStreamServer
where
REv: ReactorEventT,
{
type Event = Event;
type ConstructionError = Infallible;
fn handle_event(
&mut self,
_effect_builder: EffectBuilder<REv>,
_rng: &mut NodeRng,
event: Self::Event,
) -> Effects<Self::Event> {
match event {
Event::BlockAdded(block) => self.broadcast(SseData::BlockAdded {
block_hash: *block.hash(),
block: Box::new(JsonBlock::new(*block, None)),
}),
Event::DeployAccepted(deploy) => self.broadcast(SseData::DeployAccepted { deploy }),
Event::DeployProcessed {
deploy_hash,
deploy_header,
block_hash,
execution_result,
} => self.broadcast(SseData::DeployProcessed {
deploy_hash: Box::new(deploy_hash),
account: Box::new(deploy_header.account().clone()),
timestamp: deploy_header.timestamp(),
ttl: deploy_header.ttl(),
dependencies: deploy_header.dependencies().clone(),
block_hash: Box::new(block_hash),
execution_result,
}),
Event::DeploysExpired(deploy_hashes) => deploy_hashes
.into_iter()
.flat_map(|deploy_hash| self.broadcast(SseData::DeployExpired { deploy_hash }))
.collect(),
Event::Fault {
era_id,
public_key,
timestamp,
} => self.broadcast(SseData::Fault {
era_id,
public_key,
timestamp,
}),
Event::FinalitySignature(fs) => self.broadcast(SseData::FinalitySignature(fs)),
Event::Step {
era_id,
execution_effect,
} => self.broadcast(SseData::Step {
era_id,
execution_effect,
}),
}
}
}