mod config;
mod event;
mod http_server;
mod sse_server;
use std::{convert::Infallible, fmt::Debug};
use datasize::DataSize;
use tokio::sync::mpsc::{self, UnboundedSender};
use super::Component;
use crate::{
effect::{EffectBuilder, Effects},
utils::{self, ListeningError},
NodeRng,
};
pub use config::Config;
pub(crate) use event::Event;
pub use sse_server::SseData;
pub trait ReactorEventT: From<Event> + Send {}
impl<REv> ReactorEventT for REv where REv: From<Event> + Send + 'static {}
#[derive(DataSize, Debug)]
pub(crate) struct EventStreamServer {
#[data_size(skip)]
sse_data_sender: UnboundedSender<SseData>,
}
impl EventStreamServer {
pub(crate) fn new<REv>(
config: Config,
_effect_builder: EffectBuilder<REv>,
) -> Result<Self, ListeningError>
where
REv: ReactorEventT,
{
let (sse_data_sender, sse_data_receiver) = mpsc::unbounded_channel();
let builder = utils::start_listening(&config.address)?;
tokio::spawn(http_server::run(config, builder, sse_data_receiver));
Ok(EventStreamServer { sse_data_sender })
}
fn broadcast(&mut self, sse_data: SseData) -> Effects<Event> {
let _ = self.sse_data_sender.send(sse_data);
Effects::new()
}
}
impl<REv> Component<REv> for EventStreamServer
where
REv: ReactorEventT,
{
type Event = Event;
type ConstructionError = Infallible;
fn handle_event(
&mut self,
_effect_builder: EffectBuilder<REv>,
_rng: &mut NodeRng,
event: Self::Event,
) -> Effects<Self::Event> {
match event {
Event::BlockAdded {
block_hash,
block_header,
} => self.broadcast(SseData::BlockAdded {
block_hash,
block_header: Box::new(*block_header),
}),
Event::DeployProcessed {
deploy_hash,
deploy_header,
block_hash,
execution_result,
} => self.broadcast(SseData::DeployProcessed {
deploy_hash: Box::new(deploy_hash),
account: *deploy_header.account(),
timestamp: deploy_header.timestamp(),
ttl: deploy_header.ttl(),
dependencies: deploy_header.dependencies().clone(),
block_hash: Box::new(block_hash),
execution_result,
}),
Event::Fault {
era_id,
public_key,
timestamp,
} => self.broadcast(SseData::Fault {
era_id,
public_key,
timestamp,
}),
Event::FinalitySignature(fs) => self.broadcast(SseData::FinalitySignature(fs)),
}
}
}