use datasize::DataSize;
use futures::{Stream, StreamExt};
use lazy_static::lazy_static;
use semver::Version;
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, mpsc};
use tracing::{error, trace};
use warp::{
filters::BoxedFilter,
sse::{self, ServerSentEvent as WarpServerSentEvent},
Filter, Reply,
};
use super::CLIENT_API_VERSION;
use crate::{
crypto::asymmetric_key::PublicKey,
types::{
json_compatibility::ExecutionResult, BlockHash, BlockHeader, DeployHash, FinalizedBlock,
TimeDiff, Timestamp,
},
};
pub const SSE_API_PATH: &str = "events";
const BROADCAST_CHANNEL_SIZE: usize = 10;
lazy_static! {
pub(super) static ref SSE_INITIAL_EVENT: ServerSentEvent = ServerSentEvent {
id: None,
data: SseData::ApiVersion(CLIENT_API_VERSION.clone())
};
}
type Id = u32;
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug, DataSize)]
pub enum SseData {
#[data_size(skip)]
ApiVersion(Version),
BlockFinalized(FinalizedBlock),
BlockAdded {
block_hash: BlockHash,
block_header: BlockHeader,
},
DeployProcessed {
deploy_hash: DeployHash,
account: PublicKey,
timestamp: Timestamp,
ttl: TimeDiff,
dependencies: Vec<DeployHash>,
block_hash: BlockHash,
execution_result: ExecutionResult,
},
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub(super) struct ServerSentEvent {
pub(super) id: Option<Id>,
pub(super) data: SseData,
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub(super) enum BroadcastChannelMessage {
ServerSentEvent(ServerSentEvent),
Shutdown,
}
pub(super) struct NewSubscriberInfo {
pub(super) start_from: Option<Id>,
pub(super) initial_events_sender: mpsc::UnboundedSender<ServerSentEvent>,
}
#[derive(Deserialize, Debug)]
struct Query {
start_from: Option<Id>,
}
pub(super) fn create_channels_and_filter() -> (
broadcast::Sender<BroadcastChannelMessage>,
mpsc::UnboundedReceiver<NewSubscriberInfo>,
BoxedFilter<(impl Reply,)>,
) {
let (broadcaster, _) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
let cloned_broadcaster = broadcaster.clone();
let (new_subscriber_info_sender, new_subscriber_info_receiver) = mpsc::unbounded_channel();
let filter = warp::get()
.and(warp::path(SSE_API_PATH))
.and(warp::query().map(move |query: Query| {
let (initial_events_sender, initial_events_receiver) = mpsc::unbounded_channel();
let new_subscriber_info = NewSubscriberInfo {
start_from: query.start_from,
initial_events_sender,
};
if new_subscriber_info_sender
.send(new_subscriber_info)
.is_err()
{
error!("failed to send new subscriber info");
}
let ongoing_events_receiver = cloned_broadcaster.subscribe();
sse::reply(sse::keep_alive().stream(stream_to_client(
initial_events_receiver,
ongoing_events_receiver,
)))
}))
.boxed();
(broadcaster, new_subscriber_info_receiver, filter)
}
fn stream_to_client(
initial_events: mpsc::UnboundedReceiver<ServerSentEvent>,
ongoing_events: broadcast::Receiver<BroadcastChannelMessage>,
) -> impl Stream<Item = Result<impl WarpServerSentEvent, broadcast::RecvError>> + 'static {
initial_events
.map(|event| Ok(BroadcastChannelMessage::ServerSentEvent(event)))
.chain(ongoing_events)
.map(|result| {
trace!(?result);
match result? {
BroadcastChannelMessage::ServerSentEvent(event) => match (event.id, &event.data) {
(None, &SseData::ApiVersion { .. }) => Ok(sse::json(event.data).boxed()),
(Some(id), &SseData::BlockFinalized { .. })
| (Some(id), &SseData::BlockAdded { .. })
| (Some(id), &SseData::DeployProcessed { .. }) => {
Ok((sse::id(id), sse::json(event.data)).boxed())
}
_ => unreachable!("only ApiVersion may have no event ID"),
},
BroadcastChannelMessage::Shutdown => Err(broadcast::RecvError::Closed),
}
})
}